package org.example.react.magazinist;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

/**
 * 订阅者收到一个数字
 */
public class MagazineSubscriber implements Flow.Subscriber<Integer> {

    public static final String JACK = "Jack";
    public static final String PETE = "Pete";

    private final long sleepTime;

    private final String subscriberName;

    private Flow.Subscription subscription;

    private int nextMagazineExpected;

    private int totalRead;

    public MagazineSubscriber(long sleepTime, String subscriberName) {
        this.sleepTime = sleepTime;
        this.subscriberName = subscriberName;
        this.nextMagazineExpected = 1;
        this.totalRead = 0;
    }

    /**
     * Publisher 在被指定一个新的Subscriber时，调用此方法。
     * 一般来说需要在subscriber内部保存这个 subscription 实例，因为后面需要通过它向publisher发送信号来完成：请求更多数据，或者取消订阅。
     * @param subscription
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    /**
     * 新数据产生，这个方法会被调用。
     * @param item
     */
    @Override
    public void onNext(Integer item) {
        if (item != nextMagazineExpected) {
            IntStream.range(nextMagazineExpected, item).forEach(
                    i -> log(String.format("Oh no! I missed the magazine %d", i))
            );
            nextMagazineExpected = item;
        }
        log(String.format("Great! I got a new magazine: %d", item));
        takeSomeRest();
        nextMagazineExpected++;
        totalRead++;
    }

    /**
     * 当publisher出现异常时会调用subscriber的这个方法。
     * @param throwable
     */
    @Override
    public void onError(Throwable throwable) {
        log(String.format("Oops I got an error from the Publisher: %s", throwable.getMessage()));
    }

    /**
     * 当publisher数据推送完毕时会调用此方法，于是整个订阅过程结束。
     */
    @Override
    public void onComplete() {
        log("Finally! I completed the subscription, I got in total " +
                totalRead + " magazines.");
    }

    private void log(String message) {
        System.out.println("<=========== [" + subscriberName + "] : " + message);
    }

    public String getSubscriberName() {
        return subscriberName;
    }

    private void takeSomeRest() {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
